Skip to content

24 concurrent.futures并发执行

threadingmultiprocessing虽然强大,但用起来有点繁琐——手动创建线程/进程、管理生命周期、收集结果。concurrent.futures提供了更高级的接口,用线程池和进程池简化并发编程。

一、ThreadPoolExecutor:线程池

1.1 基本用法

python
from concurrent.futures import ThreadPoolExecutor

def task(n):
    import time
    time.sleep(1)
    return n * n

# 创建线程池,最多3个线程
with ThreadPoolExecutor(max_workers=3) as executor:
    # submit提交任务,返回Future对象
    futures = [executor.submit(task, i) for i in range(5)]

    # 获取结果
    for f in futures:
        print(f.result())  # 阻塞直到任务完成

1.2 map方法

更简洁的并行映射。

python
from concurrent.futures import ThreadPoolExecutor

def task(n):
    import time
    time.sleep(1)
    return n * n

with ThreadPoolExecutor(max_workers=3) as executor:
    # map:并行映射,按顺序返回结果
    results = list(executor.map(task, [1, 2, 3, 4, 5]))
    print(results)  # [1, 4, 9, 16, 25]

1.3 as_completed

按完成顺序获取结果。

python
from concurrent.futures import ThreadPoolExecutor, as_completed

def task(n):
    import time
    import random
    time.sleep(random.uniform(0.5, 2))
    return n * n

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(task, i): i for i in range(5)}

    # as_completed:按完成顺序返回
    for future in as_completed(futures):
        n = futures[future]
        result = future.result()
        print(f"任务 {n} 完成: {result}")

二、ProcessPoolExecutor:进程池

用法和ThreadPoolExecutor完全一样,只是用进程代替线程。

python
from concurrent.futures import ProcessPoolExecutor

def cpu_intensive(n):
    total = 0
    for i in range(n):
        total += i * i
    return total

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(cpu_intensive, [10**6] * 8))
        print(results)

三、Future对象

3.1 获取结果

python
from concurrent.futures import ThreadPoolExecutor

def task(n):
    import time
    time.sleep(1)
    return n * n

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 5)

    # 检查是否完成
    print(future.done())  # False

    # 获取结果(阻塞)
    result = future.result()
    print(result)  # 25

    # 再次检查
    print(future.done())  # True

3.2 超时

python
from concurrent.futures import ThreadPoolExecutor, TimeoutError

def slow_task():
    import time
    time.sleep(10)
    return "完成"

with ThreadPoolExecutor() as executor:
    future = executor.submit(slow_task())

    try:
        result = future.result(timeout=3)  # 3秒超时
    except TimeoutError:
        print("任务超时!")

3.3 回调

python
from concurrent.futures import ThreadPoolExecutor

def task(n):
    return n * n

def callback(future):
    print(f"任务完成,结果: {future.result()}")

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 5)
    future.add_done_callback(callback)

3.4 取消任务

python
from concurrent.futures import ThreadPoolExecutor

def task(n):
    import time
    time.sleep(5)
    return n * n

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 5)

    # 取消任务(只能取消还未开始的任务)
    cancelled = future.cancel()
    print(f"取消成功: {cancelled}")

四、wait()

更灵活的等待方式。

python
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED

def task(n):
    import time
    time.sleep(n)
    return n

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(task, i) for i in [3, 1, 2]]

    # FIRST_COMPLETED:有一个完成就返回
    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    print(f"已完成: {len(done)}, 未完成: {len(not_done)}")

    # ALL_COMPLETED:所有完成才返回
    done, not_done = wait(futures, return_when=ALL_COMPLETED)
    print(f"全部完成: {len(done)}")

return_when参数:

  • FIRST_COMPLETED:有一个完成就返回
  • FIRST_EXCEPTION:有一个异常就返回
  • ALL_COMPLETED:所有完成才返回(默认)

五、异常处理

5.1 捕获异常

python
from concurrent.futures import ThreadPoolExecutor

def risky_task(n):
    if n == 3:
        raise ValueError(f"任务 {n} 出错了")
    return n * n

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(risky_task, i) for i in range(5)]

    for i, future in enumerate(futures):
        try:
            result = future.result()
            print(f"任务 {i}: {result}")
        except Exception as e:
            print(f"任务 {i} 异常: {e}")

5.2 return_exceptions

python
from concurrent.futures import ThreadPoolExecutor

def risky_task(n):
    if n == 3:
        raise ValueError(f"任务 {n} 出错了")
    return n * n

with ThreadPoolExecutor() as executor:
    # map的异常会直接抛出
    try:
        results = list(executor.map(risky_task, range(5)))
    except ValueError as e:
        print(f"异常: {e}")

六、实战场景

6.1 并发请求

python
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_url(url):
    time.sleep(1)  # 模拟网络请求
    return f"{url} 的响应"

urls = [f"http://example.com/api/{i}" for i in range(10)]

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(fetch_url, url): url for url in urls}

    for future in as_completed(futures):
        url = futures[future]
        result = future.result()
        print(f"{url}: {result}")

6.2 批量文件处理

python
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

def process_file(filepath):
    with open(filepath, 'r') as f:
        content = f.read()
    return f"{filepath.name}: {len(content)} 字符"

files = list(Path("./data").glob("*.txt"))

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_file, files))

for r in results:
    print(r)

6.3 超时控制

python
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError

def task(n):
    import time
    time.sleep(n)
    return n

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(task, i) for i in [1, 2, 5, 3]]

    try:
        for future in as_completed(futures, timeout=3):
            print(f"结果: {future.result()}")
    except TimeoutError:
        print("部分任务超时")

七、选择ThreadPoolExecutor还是ProcessPoolExecutor

场景用什么
I/O密集型(网络请求、文件读写)ThreadPoolExecutor
CPU密集型(计算、数据处理)ProcessPoolExecutor
不确定ThreadPoolExecutor(更轻量)

八、总结

concurrent.futures的核心:

组件用途
ThreadPoolExecutor线程池
ProcessPoolExecutor进程池
Future异步计算结果
as_completed()按完成顺序获取
wait()灵活等待任务

使用模式:

python
with ThreadPoolExecutor(max_workers=5) as executor:
    # 方式1:submit
    futures = [executor.submit(task, arg) for arg in args]
    for f in futures:
        print(f.result())

    # 方式2:map
    results = list(executor.map(task, args))

concurrent.futures是Python并发编程的首选方式,比直接用threadingmultiprocessing简单得多。